package xiaoa.java.jms.activeMQ.consumer;

import java.io.IOException;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;

import xiaoa.java.log.L;


/**
 * 简单消费者实现类
 * @author xiaoa
 * @date 2017年3月19日 下午6:21:16
 * @version V1.0
 *
 */
public class SimpleConsumerFactory {
	
	// 连接工厂
	private    ActiveMQConnectionFactory  connectionFactory     =  null;
	
	
	/**
	 * 连接
	 */
	private    ActiveMQConnection            defaultConnection  =  null;
	
	
	/**
	 * 连接
	 */
	private   Session                    defaultSession         =  null;
	
	
	/**
	 * url
	 */
	private   String                                  url       =  null ;
	
	
	/**
	 * 队列 消费者map
	 */
	private Map<String, List<MessageConsumer>>   consumerMap   = new HashMap< String , List<MessageConsumer>>();
	
	
	// 是否初始化过
	private boolean init = false;
	
	/**
	 * 
	 * 构造器
	 * <p>Title: </p>
	 * <p>Description: </p>
	 * @author xiaoa
	 * @param url
	 */
	public   SimpleConsumerFactory(String url) throws Throwable {
		
		if (url == null){
			throw new RuntimeException("url is null");
		}
		
		this.url  = url;
		
		// 实例化连接工厂
		connectionFactory          = new ActiveMQConnectionFactory(this.url);
		
		// 构造从工厂得到连接对象
		defaultConnection          = (ActiveMQConnection)connectionFactory.createConnection();
		
		// 开启链接
		defaultConnection.start();
		
		// 设置监听
		defaultConnection.addTransportListener(new simpleTransportListener());
		
		// 创建一个默认session
		defaultSession             = defaultConnection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
		
		init = true;
		
	}
	
	
	
	/**
	 * 添加一个消费者
	 * @Title: addConsumer
	 * @param handle   
	 * @param queueName
	 * @param consumerNum  如果为0 用默认 session 大于零 另外创建session
	 * @throws Throwable
	 * @author xiaoa
	 */
	public void addConsumer(IMQMessageHandle handle , String  queueName , int consumerNum)throws Throwable{
		
		addConsumer(handle, queueName, consumerNum , false);
		
	}
	
	/**
	 * 添加一个消费者
	 * @Title: addConsumer
	 * @param handle
	 * @param queueName
	 * @param consumerNum
	 * @param newConnection  是否创建一个新的 connection
	 * @throws Throwable
	 * @author xiaoa
	 */
	public  void addConsumer(IMQMessageHandle handle , String  queueName , int consumerNum , boolean newConnection)throws Throwable{
		
		if (!init){
			throw new RuntimeException("  uninitialized  ");
		}
		
		if (handle == null){
			throw new RuntimeException("handle is null");
		}
		
		if (queueName == null || queueName.trim().equals("")){
			throw new RuntimeException("queueName is null  queueName = " + queueName);
		}
		
		//  操作连接
		ActiveMQConnection  connection  = defaultConnection;
		
		// 开启链接
	    defaultConnection.start();
		
		if (newConnection ){
			
			// 获取一个新的链接
			connection  = (ActiveMQConnection)connectionFactory.createConnection();
			
			// 设置监听
			connection.addTransportListener(new simpleTransportListener());
			
		}
		
		// 如果消费端数量大于 0 
		if (consumerNum > 0){
			 
			 for ( int i = 0 ; i < consumerNum ; i ++){
				 
				 // 创建一个session
				 Session  session          = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
				 
			     // 创建一个消费者
	    	     MessageConsumer  consumer = session.createConsumer(session.createQueue(queueName));
	    	     
	    	     // 设置消费者监听
	    	     consumer.setMessageListener(new MessageListenerHandle(handle));
	    	     
	    	     // 将消费者添加到map中
	    	     List<MessageConsumer>    consumerList = consumerMap.get(queueName);
	    	     
	    	     // 添加到consumerMap
	    	     if (consumerList == null){
	    	    	 consumerList  = new ArrayList<MessageConsumer>();
	    	     }
	    	     
	    	     consumerList.add(consumer);
	    	     
	    	     consumerMap.put(queueName, consumerList);
				 
			 }
			
		}else{
			
			// 使用session
			Session  session  = defaultSession;
			
			// 如果不是使用默认 connection
			if (connection != defaultConnection){
				session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
			}
			
			 // 创建一个消费者
	   	     MessageConsumer  consumer = session.createConsumer(session.createQueue(queueName));
	   	     
	   	     // 设置消费者监听
	   	     consumer.setMessageListener(new MessageListenerHandle(handle));
	   	     
	   	     // 将消费者添加到map中
	   	     List<MessageConsumer>    consumerList = consumerMap.get(queueName);
	   	     
	   	     // 添加到consumerMap
	   	     if (consumerList == null){
	   	    	 consumerList  = new ArrayList<MessageConsumer>();
	   	     }
	   	     
	   	     consumerList.add(consumer);
	   	     
	   	     consumerMap.put(queueName, consumerList);
			
		}
		
		
	}
	
	
	/**
	 * 消息处理
	 * @author xiaoa
	 * @date 2017年3月21日 下午8:49:11
	 * @version V1.0
	 *
	 */
	private  class MessageListenerHandle implements MessageListener{
		
		/**
		 * 处理器
		 */
		IMQMessageHandle handle = null;
	
		public MessageListenerHandle(IMQMessageHandle  handle ) {
			
			this.handle = handle;
		}
		

		@Override
		public void onMessage(Message message) {
			
			// 异常信息
			Throwable  ec   = null;
			
			// 事务是否完成
			boolean transaction  = false;
			
			try {
				
				// 调用处理
				transaction = handle.onMessage(message);
				
				// 提交message
				if (transaction){
					message.acknowledge();
				}
				
			} catch (Throwable e) {
				e.printStackTrace();
				ec = e;
			}
			
			// 如果事务没有完成  调用异常回调
			if (ec != null || transaction == false){
				 onHandleException(ec, message);
			}
			
		}
		
	}
	
	
	/**
	 * 连接监听
	 * @author xiaoa
	 * @date 2017年3月22日 上午9:30:10
	 * @version V1.0
	 *
	 */
	public class simpleTransportListener implements TransportListener {

		@Override
		public void onCommand(Object command) {
			
		}

		@Override
		public void onException(IOException error) {
			
			if (error != null){
				error.printStackTrace();
			}
			
			// 当连接发生异常
			onConnectionException(error);
			
		}

		@Override
		public void transportInterupted() {
			L.info("===============>> 重连");
		}

		@Override
		public void transportResumed() {
			
		}}
	
	
	
	/**
	 * 当连接发生异常
	 * @Title: onConnectionException
	 * @param e
	 * @author xiaoa
	 */
     public void onConnectionException( Throwable e ){
    	 
		
	}
	
	
	/**
	 * 当处理器出现异常
	 * @Title: onHandleError
	 * @param e
	 * @param message
	 * @author xiaoa
	 */
	public void onHandleException( Throwable e ,  Message message){
		
	}
	
	

}
